package com.ruoyi.flink.chapten5;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Trans6RichFunctionTest {
	
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(1);
		DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
				new Event("Bob", "./cart", 2000L), 
				new Event("Alice", "./prod?id=100", 3000L)
        );

		stream.map(new MyRichMapper()).setParallelism(2).print();
		env.execute();
		
	}
	
	
	// 实现一个自定义的福函数类
	public static class MyRichMapper extends RichMapFunction<Event, Integer>{
		@Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            System.out.println("当前是Open生命周期被调用" + getRuntimeContext().getIndexOfThisSubtask()+"号任务启动");
        }
		
		@Override
		public Integer map(Event value) throws Exception {
			// TODO Auto-generated method stub
			return value.url.length();
		}
		
		 @Override
         public void close() throws Exception {
             super.close();
             System.out.println("索引为 " + getRuntimeContext().getIndexOfThisSubtask() + " 的任务结束");
         }
		
	
	}
	
	
	
	

}
